package io.sundial.executor.impl;

import io.sundial.coordination.CallbackException;
import io.sundial.coordination.CallbackStatus;
import io.sundial.coordination.Coordinator;
import io.sundial.coordination.node.Node;
import io.sundial.core.Callback;
import io.sundial.core.context.Context;
import io.sundial.core.event.EventListener;
import io.sundial.core.lifecycle.exception.DestroyingException;
import io.sundial.core.lifecycle.exception.InitializingException;
import io.sundial.discovery.Discoverer;
import io.sundial.discovery.DiscovererEvent;
import io.sundial.discovery.Discovering;
import io.sundial.discovery.event.ChangedEvent;
import io.sundial.discovery.exception.DiscoveringException;
import io.sundial.engine.impl.EventEngine;
import io.sundial.executor.Executor;
import io.sundial.job.Job;
import io.sundial.job.JobDefinition;
import io.sundial.job.JobKey;
import io.sundial.job.JobParameter;
import io.sundial.util.MultipleEnumeration;

import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;

/**
 * 抽象的作业发现执行器
 *
 * @author Payne 646742615@qq.com
 * 2018/12/20 17:03
 */
public abstract class DiscoveringExecutor extends EventEngine implements Executor, Discovering {
    protected static final String JOBS_ROOT = SPRT + "jobs";

    private final Callback<Coordinator.ObtainResult, CallbackException> jobDefinitionObtainCallback = new JobDefinitionObtainCallback();
    private final Callback<Coordinator.CreateResult, CallbackException> jobDefinitionCreateCallback = new JobDefinitionCreateCallback();
    private final Callback<Coordinator.UpdateResult, CallbackException> jobDefinitionUpdateCallback = new JobDefinitionUpdateCallback();
    private final Callback<Coordinator.CreateResult, CallbackException> jobAssociationCreateCallback = new JobAssociationCreateCallback();
    private final Callback<Coordinator.DeleteResult, CallbackException> jobAssociationDeleteCallback = new JobAssociationDeleteCallback();
    private final Callback<Coordinator.UpdateResult, CallbackException> jobAssociationUpdateCallback = new JobAssociationUpdateCallback();
    private final Callback<Coordinator.ObtainResult, CallbackException> jobAssociationObtainForDeleteCallback = new JobAssociationObtainForDeleteCallback();
    private final Callback<Coordinator.ObtainResult, CallbackException> jobAssociationObtainForUpdateCallback = new JobAssociationObtainForUpdateCallback();

    private final EventListener<DiscovererEvent> discovererEventListener = new DiscovererEventListener();
    private Set<Discoverer> discoverers;

    @Override
    protected void initializing(Context context) throws InitializingException {
        super.initializing(context);

        // 如果没有设置就从上下文中找到所有的。
        if (discoverers == null || discoverers.isEmpty()) {
            discoverers = new ConcurrentSkipListSet<>(new DiscovererComparator());
            Map<String, Discoverer> map = context.fetch(Discoverer.class);
            Collection<Discoverer> collection = map.values();
            for (Discoverer discoverer : collection) {
                discoverer.initialize(context);
                discoverers.add(discoverer);
            }
        }
        try {
            for (Discoverer discoverer : discoverers) {
                discoverer.addEventListener(discovererEventListener);
            }
            discovererEventListener.onListened(new ChangedEvent());
        } catch (Exception e) {
            throw new InitializingException(e);
        }
    }

    @Override
    protected void destroying() throws DestroyingException {
        super.destroying();

        for (Discoverer discoverer : discoverers) {
            discoverer.removeEventListener(discovererEventListener);
            discoverer.destroy();
        }
        discoverers.clear();
        discoverers = null;
    }

    protected Job discover(String jobName, String jobGroup) throws DiscoveringException {
        for (Discoverer discoverer : discoverers) {
            try {
                return discoverer.discover(jobName, jobGroup);
            } catch (DiscoveringException ignored) {
                /* ignored */
            }
        }
        throw new DiscoveringException("jobGroup: " + jobGroup + ", jobName: " + jobName);
    }

    protected Enumeration<Job> jobs() {
        List<Enumeration<Job>> enumerations = new ArrayList<>();
        for (Discoverer discoverer : discoverers) {
            enumerations.add(discoverer.jobs());
        }
        return new MultipleEnumeration<>(enumerations);
    }

    @Override
    public boolean install(Discoverer discoverer) {
        return discoverers.add(discoverer);
    }

    @Override
    public boolean uninstall(Discoverer discoverer) {
        return discoverers.remove(discoverer);
    }

    public Set<Discoverer> getDiscoverers() {
        return discoverers;
    }

    public void setDiscoverers(Set<Discoverer> discoverers) {
        this.discoverers = discoverers;
    }

    private static class DiscovererComparator implements Comparator<Discoverer> {

        @Override
        public int compare(Discoverer a, Discoverer b) {
            return Long.compare(a.hashCode(), b.hashCode());
        }
    }

    private class DiscovererEventListener implements EventListener<DiscovererEvent> {
        private final Map<JobKey, JobDefinition> definitions = new HashMap<>();

        @Override
        public synchronized void onListened(DiscovererEvent event) throws Exception {
            Map<JobKey, JobDefinition> map = new HashMap<>(definitions);
            definitions.clear();
            Enumeration<Job> enumeration = jobs();
            while (enumeration.hasMoreElements()) {
                Job job = enumeration.nextElement();

                String jobName = job.name();
                if (jobName == null) {
                    logger.error("job name of job class {} is null", job.getClass());
                    continue;
                }
                if (jobName.contains("/")) {
                    logger.error("job name of job class {} must not contains '/'", job.getClass());
                    continue;
                }

                String jobGroup = job.group();
                if (jobGroup == null) {
                    logger.error("job group of job class {} is null", job.getClass());
                    continue;
                }
                if (jobGroup.contains("/")) {
                    logger.error("job group of job class {} must not contains '/'", job.getClass());
                    continue;
                }

                String jobVersion = job.version();
                if (jobVersion == null) {
                    logger.error("job version of job class {} is null", job.getClass());
                    continue;
                }
                if (!jobVersion.matches("\\d+(\\.\\d+)*")) {
                    logger.error("job version of job class {} is valid which must matches regex '\\d+(\\.\\d+)*'", job.getClass());
                    continue;
                }

                int jobShardable = job.shardable();
                String jobType = job.type();
                Map<String, JobParameter> jobParameters = job.parameters();
                String jobDescription = job.description();

                JobKey jobKey = new JobKey(jobName, jobGroup);
                map.remove(jobKey);
                JobDefinition jobDefinition = new JobDefinition(
                        jobName,
                        jobGroup,
                        jobType,
                        jobVersion,
                        jobShardable,
                        jobParameters,
                        jobDescription
                );

                definitions.put(jobKey, jobDefinition);

                String path = JOBS_ROOT + SPRT + jobGroup + SPRT + jobName;
                coordinator.obtain(path, jobDefinitionObtainCallback, jobDefinition);
            }
            for (JobDefinition jobDefinition : map.values()) {
                String jobName = jobDefinition.getName();
                String jobGroup = jobDefinition.getGroup();
                String path = JOBS_ROOT + SPRT + jobGroup + SPRT + jobName + SPRT + name;
                coordinator.obtain(path, jobAssociationObtainForDeleteCallback, jobDefinition);
            }
        }
    }

    private class JobDefinitionObtainCallback implements Callback<Coordinator.ObtainResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.ObtainResult result, CallbackException exception) throws Exception {
            // 作业定义存在
            if (success) {
                Node node = result.getNode();
                String path = node.getPath();
                byte[] oldData = node.getData();
                JobDefinition oldDefinition = unmarshal(oldData, JobDefinition.class);
                JobDefinition newDefinition = (JobDefinition) result.getContext();
                byte[] newData = marshall(newDefinition);
                // 之前的作业定义版本比当前的低，更新之！
                if (oldDefinition.compareTo(newDefinition) < 0) {
                    int oldVersion = node.getVersion();
                    coordinator.update(path, newData, oldVersion, jobDefinitionUpdateCallback, newDefinition);
                }
                // 否则在其子节点中创建该执行器临时节点
                else {
                    coordinator.create(path + SPRT + name, newData, false, false, jobAssociationCreateCallback, newDefinition);
                }
            }
            // 作业定义不存在
            else if (exception.getCode() == CallbackStatus.CODE_NO_NODE) {
                JobDefinition definition = (JobDefinition) exception.getContext();
                String name = definition.getName();
                String group = definition.getGroup();
                String path = JOBS_ROOT + SPRT + group + SPRT + name;
                byte[] jobData = marshall(definition);
                coordinator.create(path, jobData, true, false, jobDefinitionCreateCallback, definition);
            }
            // 其他错误
            else {
                logger.error("error occurred while obtaining job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }

    }

    private class JobDefinitionCreateCallback implements Callback<Coordinator.CreateResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.CreateResult result, CallbackException exception) throws Exception {
            // 作业定义创建成功
            if (success) {
                // 创建临时节点
                String path = result.getPath();
                JobDefinition definition = (JobDefinition) result.getContext();
                byte[] data = marshall(definition);
                coordinator.create(path + SPRT + name, data, false, false, jobAssociationCreateCallback, definition);
            }
            // 作业定义已存在
            else if (exception.getCode() == CallbackStatus.CODE_NODE_EXISTS) {
                // 尝试获取并比较
                JobDefinition definition = (JobDefinition) exception.getContext();
                String group = definition.getGroup();
                String name = definition.getName();
                coordinator.obtain(JOBS_ROOT + SPRT + group + SPRT + name, jobDefinitionObtainCallback, definition);
            }
            // 其他错误
            else {
                logger.error("error occurred while creating job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }

    private class JobDefinitionUpdateCallback implements Callback<Coordinator.UpdateResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.UpdateResult result, CallbackException exception) throws Exception {
            // 更新作业定义成功
            if (success) {
                // 创建临时节点
                String path = result.getPath();
                JobDefinition definition = (JobDefinition) result.getContext();
                byte[] data = marshall(definition);
                coordinator.create(path + SPRT + name, data, false, false, jobAssociationCreateCallback, definition);
            }
            // 作业定义版本失效
            else if (exception.getCode() == CallbackStatus.CODE_BAD_VERSION) {
                // 尝试获取并比较
                JobDefinition definition = (JobDefinition) exception.getContext();
                String group = definition.getGroup();
                String name = definition.getName();
                coordinator.obtain(JOBS_ROOT + SPRT + group + SPRT + name, jobDefinitionObtainCallback, definition);
            }
            // 其他错误
            else {
                logger.error("error occurred while updating job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }

    private class JobAssociationCreateCallback implements Callback<Coordinator.CreateResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.CreateResult result, CallbackException exception) throws Exception {
            // 创建成功
            if (success) {
                logger.info("success creating association with job definition {}", result.getContext());
            }
            // 节点存在
            else if (exception.getCode() == CallbackStatus.CODE_NODE_EXISTS) {
                // 更新作业定义
                JobDefinition jobDefinition = (JobDefinition) exception.getContext();
                String jobName = jobDefinition.getName();
                String jobGroup = jobDefinition.getGroup();
                coordinator.obtain(
                        JOBS_ROOT + SPRT + jobGroup + SPRT + jobName + SPRT + name,
                        jobAssociationObtainForUpdateCallback,
                        jobDefinition
                );
            }
            // 其他错误
            else {
                logger.error("error occurred while creating association with job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }

    private class JobAssociationDeleteCallback implements Callback<Coordinator.DeleteResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.DeleteResult result, CallbackException exception) throws Exception {
            if (success) {
                logger.info("success deleting association with job definition {}", result.getContext());
            } else if (exception.getCode() == CallbackStatus.CODE_BAD_VERSION) {
                JobDefinition jobDefinition = (JobDefinition) exception.getContext();
                String jobName = jobDefinition.getName();
                String jobGroup = jobDefinition.getGroup();
                coordinator.obtain(
                        JOBS_ROOT + SPRT + jobGroup + SPRT + jobName + SPRT + name,
                        jobAssociationObtainForDeleteCallback,
                        jobDefinition
                );
            } else {
                logger.warn("error occurred while deleting association with job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }

    private class JobAssociationUpdateCallback implements Callback<Coordinator.UpdateResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.UpdateResult result, CallbackException exception) throws Exception {
            if (success) {
                logger.info("success updating association with job definition {}", result.getContext());
            } else if (exception.getCode() == CallbackStatus.CODE_BAD_VERSION) {
                // 更新作业定义
                JobDefinition jobDefinition = (JobDefinition) exception.getContext();
                String jobName = jobDefinition.getName();
                String jobGroup = jobDefinition.getGroup();
                coordinator.obtain(
                        JOBS_ROOT + SPRT + jobGroup + SPRT + jobName + SPRT + name,
                        jobAssociationObtainForUpdateCallback,
                        jobDefinition
                );
            } else {
                logger.warn("error occurred while updating association with job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }

    private class JobAssociationObtainForDeleteCallback implements Callback<Coordinator.ObtainResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.ObtainResult result, CallbackException exception) throws Exception {
            if (success) {
                Node node = result.getNode();
                JobDefinition newDefinition = (JobDefinition) result.getContext();
                JobDefinition oldDefinition = unmarshal(node.getData(), JobDefinition.class);
                if (newDefinition.compareTo(oldDefinition) >= 0) {
                    coordinator.delete(node.getPath(), node.getVersion(), true, true, jobAssociationDeleteCallback, newDefinition);
                }
            } else {
                logger.error("error occurred while obtaining association with job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }

    private class JobAssociationObtainForUpdateCallback implements Callback<Coordinator.ObtainResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.ObtainResult result, CallbackException exception) throws Exception {
            if (success) {
                Node node = result.getNode();
                JobDefinition newDefinition = (JobDefinition) result.getContext();
                JobDefinition oldDefinition = unmarshal(node.getData(), JobDefinition.class);
                if (oldDefinition.compareTo(newDefinition) < 0) {
                    byte[] data = marshall(newDefinition);
                    coordinator.update(node.getPath(), data, node.getVersion(), jobAssociationUpdateCallback, newDefinition);
                }
            } else {
                logger.error("error occurred while obtaining association with job definition {} with code {}", exception.getContext(), exception.getCode());
            }
        }
    }
}
